package raw.interceptor;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

public class RequestStatCalInterceptor implements ProducerInterceptor<String,String> {
    private static final String MSG_PREFIX="dhy:";
    private final AtomicInteger successCnt = new AtomicInteger(0);
    private final AtomicInteger errorCnt = new AtomicInteger(0);


    @Override
    public ProducerRecord<String,String> onSend(ProducerRecord<String,String> msg) {
        return new ProducerRecord<>(msg.topic(),msg.partition(),msg.timestamp(),msg.key(),MSG_PREFIX+msg.value());
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        if (metadata == null) {
            errorCnt.getAndIncrement();
        }else {
            successCnt.getAndIncrement();
        }
    }

    @Override
    public void close() {
        double successRate = (double) successCnt.get() / (successCnt.get() + errorCnt.get());
        System.out.println("消息发送成功率：" + successRate*100 +"%");
    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}
